use std::collections::BTreeMap;

#[allow(warnings, clippy::pedantic, clippy::nursery)]
mod ddmetric_proto {
    include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
}

use ddmetric_proto::{
    metric_payload::{MetricSeries, MetricType},
    MetricPayload,
};
use tracing::info;
use vector::common::datadog::DatadogSeriesMetric;

use self::ddmetric_proto::metric_payload::{MetricPoint, Resource};

use super::*;

const SERIES_ENDPOINT_V1: &str = "/api/v1/series";
const SERIES_ENDPOINT_V2: &str = "/api/v2/series";

// unique identification of a Series
#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
struct SeriesContext {
    metric_name: String,
    tags: Vec<String>,
    r#type: i32,
}

#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
struct TimeBucket(i64, i64);

fn get_time_bucket(point: &MetricPoint, interval: i64, metric_type: MetricType) -> TimeBucket {
    match metric_type {
        MetricType::Unspecified => panic!("received an unspecified metric type"),
        MetricType::Rate => TimeBucket(point.timestamp - interval, point.timestamp),
        MetricType::Gauge | MetricType::Count => TimeBucket(point.timestamp, point.timestamp),
    }
}

type TimeSeriesData = BTreeMap<TimeBucket, Vec<f64>>;

/// This type represents the massaged intake data collected from the upstream.
/// The idea is to be able to store what was received in a way that allows us to
/// compare what is important to compare, and accounting for the bits that are not
/// guaranteed to line up.
///
/// For instance, the services that are running, may start at different times, thus the
/// timestamps (TimeBucket) for data points received are not guaranteed to match up.
type SeriesIntake = BTreeMap<SeriesContext, TimeSeriesData>;

// massages the raw payloads into our intake structure
fn generate_series_intake(payloads: &[MetricPayload]) -> SeriesIntake {
    let mut intake = BTreeMap::new();

    payloads.iter().for_each(|payload| {
        payload.series.iter().for_each(|serie| {
            // filter out the metrics we don't care about (ones not generated by the client)
            if !serie.metric.starts_with("foo_metric") {
                return;
            }

            let ctx = SeriesContext {
                metric_name: serie.metric.clone(),
                tags: serie.tags.clone(),
                r#type: serie.r#type,
            };

            if !intake.contains_key(&ctx) {
                intake.insert(ctx.clone(), BTreeMap::new());
            }
            let entry: &mut BTreeMap<TimeBucket, Vec<f64>> = intake.get_mut(&ctx).unwrap();

            serie.points.iter().for_each(|point| {
                let tb = get_time_bucket(point, serie.interval, serie.r#type());
                if !entry.contains_key(&tb) {
                    entry.insert(tb.clone(), Vec::new());
                }
                entry.get_mut(&tb).unwrap().push(point.value);
            });
        });
    });

    intake
}

// runs assertions that each set of payloads should be true to regardless
// of the pipeline
fn common_series_assertions(series: &SeriesIntake) {
    // we should have received some metrics from the emitter
    assert!(!series.is_empty());
    info!("metric series received: {}", series.len());

    // specifically we should have received each of these
    let mut found = [
        // NOTE: no count expected due to the in-app type being Rate
        // (https://docs.datadoghq.com/metrics/types/?tab=count#submission-types-and-datadog-in-app-types)
        (false, "rate"),
        (false, "gauge"),
        (false, "set"),
        (false, "histogram"),
    ];
    series.keys().for_each(|ctx| {
        found.iter_mut().for_each(|found| {
            if ctx
                .metric_name
                .starts_with(&format!("foo_metric.{}", found.1))
            {
                info!("received {}", found.1);
                found.0 = true;
            }
        });
    });

    found
        .iter()
        .for_each(|(found, mtype)| assert!(found, "Didn't receive metric type {}", *mtype));
}

impl From<&DatadogSeriesMetric> for MetricSeries {
    fn from(input: &DatadogSeriesMetric) -> Self {
        let mut resources = vec![];
        if let Some(host) = &input.host {
            resources.push(Resource {
                r#type: "host".to_string(),
                name: host.clone(),
            });
        }

        let points = input
            .points
            .iter()
            .map(|point| MetricPoint {
                value: point.1,
                timestamp: point.0,
            })
            .collect();

        let interval = input.interval.unwrap_or(0) as i64;

        let r#type = match input.r#type {
            vector::common::datadog::DatadogMetricType::Gauge => 3,
            vector::common::datadog::DatadogMetricType::Count => 1,
            vector::common::datadog::DatadogMetricType::Rate => 2,
        };

        MetricSeries {
            resources,
            metric: input.metric.clone(),
            tags: input.tags.clone().unwrap_or_default(),
            points,
            r#type,
            unit: "".to_string(),
            source_type_name: input.clone().source_type_name.unwrap_or_default(),
            interval,
            metadata: None,
        }
    }
}

fn convert_v1_payloads_v2(input: &[DatadogSeriesMetric]) -> Vec<MetricPayload> {
    input
        .iter()
        .map(|serie| MetricPayload {
            series: vec![serie.into()],
        })
        .collect()
}

fn unpack_v1_series(in_payloads: &[FakeIntakePayloadJson]) -> Vec<DatadogSeriesMetric> {
    in_payloads
        .iter()
        .flat_map(|payload| {
            let series = payload.data.as_array().unwrap();
            series
                .iter()
                .map(|serie| serde_json::from_value(serie.clone()).unwrap())
        })
        .collect()
}

async fn get_v1_series_from_pipeline(address: String) -> SeriesIntake {
    info!("getting v1 series payloads");
    let payloads =
        get_fakeintake_payloads::<FakeIntakeResponseJson>(&address, SERIES_ENDPOINT_V1).await;

    info!("unpacking payloads");
    let payloads = unpack_v1_series(&payloads.payloads);
    info!("converting payloads");
    let payloads = convert_v1_payloads_v2(&payloads);

    info!("generating series intake");
    let intake = generate_series_intake(&payloads);

    common_series_assertions(&intake);

    info!("{:?}", intake);

    intake
}

async fn get_v2_series_from_pipeline(address: String) -> SeriesIntake {
    info!("getting v2 series payloads");
    let payloads =
        get_fakeintake_payloads::<FakeIntakeResponseRaw>(&address, SERIES_ENDPOINT_V2).await;

    info!("unpacking payloads");
    let payloads = unpack_proto_payloads::<MetricPayload>(&payloads);

    info!("generating series intake");
    let intake = generate_series_intake(&payloads);

    common_series_assertions(&intake);

    info!("{:?}", intake);

    intake
}

pub(super) async fn validate() {
    info!("==== getting series data from agent-only pipeline ==== ");
    let agent_intake = get_v2_series_from_pipeline(fake_intake_agent_address()).await;

    info!("==== getting series data from agent-vector pipeline ====");
    let vector_intake = get_v1_series_from_pipeline(fake_intake_vector_address()).await;

    assert_eq!(
        agent_intake.len(),
        vector_intake.len(),
        "different number of unique Series contexts received"
    );

    // The assertions we make below can be summarized as follows:
    //   - For each metric type, we have a different set of assertions which are relevant to
    //     the expectations for that metric type's behavior. For example, gauges are a last
    //     write wins. While rates are taken as a summation over the time interval.
    //
    //   - The intake from each case (Agent only, and Agent->Vector) is stored in a data structure
    //     that allows us to compare the consistency in the overall shape of the data collected
    //     during the entire test duration, regardless of how requests are batched. This is because
    //     the data is stored based on a time bucket, and not per request. And the time buckets holding
    //     data points are collected in BTreeMap, which means the sort order will be consistent,
    //     regardless of whether the actual timestamps between the Agent only vs Agent+Vector cases
    //     are identical (which they may not be, since there is no guarantee the compose services
    //     started at the same time, nor that the Agent will instances will process in the same time).
    //
    //     Together, this means that data sets passing these validations confirm that the Vector version
    //     used in the test case is not introducing inconsistencies in the data flowing between the
    //     Agent and the Datadog backend.

    agent_intake
        .iter()
        .zip(vector_intake.iter())
        .for_each(|(agent_ts, vector_ts)| {
            assert_eq!(agent_ts.0, vector_ts.0, "Mismatch of series context");

            let metric_type = agent_ts.0.r#type;

            // Dogstatsd emits counters but the output type from the Agent is Rate.
            // https://docs.datadoghq.com/metrics/types/?tab=rate#submission-types-and-datadog-in-app-types
            assert!(
                metric_type == 2 || metric_type == 3,
                "Metric type should always be rate or gauge."
            );

            // gauge: last one wins.
            // we can't rely on comparing each value due to the fact that we can't guarantee consistent sampling
            if metric_type == 3 {
                let last_agent_point = agent_ts
                    .1
                    .last_key_value()
                    .expect("should have received points")
                    .1;
                let last_vector_point = vector_ts
                    .1
                    .last_key_value()
                    .expect("should have received points")
                    .1;

                assert_eq!(
                    last_agent_point, last_vector_point,
                    "Mismatch of gauge data"
                );
            }

            // rate: summation.
            if metric_type == 2 {
                let agent_sum: f64 = agent_ts
                    .1
                    .iter()
                    .map(|(_tb, points)| points.iter().sum::<f64>())
                    .sum();

                let vector_sum: f64 = vector_ts
                    .1
                    .iter()
                    .map(|(_tb, points)| points.iter().sum::<f64>())
                    .sum();

                assert_eq!(agent_sum, vector_sum, "Mismatch of rate data");
            }
        });
}
